package com.shujia.spark.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo19PageRank {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
      .setAppName("student")
      .setMaster("local")

    val sc = new SparkContext(conf)

    //读取数据
    val pagerank: RDD[String] = sc.textFile("data/pagerank.txt")


    //整理数据，给每个网页一个初始的pr值

    var pageRDD: RDD[(String, List[String], Double)] = pagerank.map(line => {
      val split: Array[String] = line.split(" ")
      //当前网页
      val page: String = split(0)
      //出链列表
      val list: List[String] = split(1).split(",").toList

      (page, list, 1.0)
    })

    //网页编号和出链列表
    val pageListRDD: RDD[(String, List[String])] = pageRDD.map(kv => (kv._1, kv._2))


    //阻尼系数
    val Q = 0.85

    //网页的数量
    val N: Long = pageRDD.count()

    var flag = true

    while (flag) {

      //将网页的pr值平方给出链列表
      val avgPageRDD: RDD[(String, Double)] = pageRDD.flatMap {
        case (page: String, list: List[String], pr: Double) =>

          val avgPr: Double = pr / list.size

          list.map(p => (p, avgPr))
      }

      //计算新的pr值
      var nowPrRDD: RDD[(String, Double)] = avgPageRDD
        .reduceByKey(_ + _)
        .mapValues(pr => (1 - Q) / N + Q * pr) //增加阻尼系数


      //将每个网页的出链列表关联回去
      val joinRDD: RDD[(String, (Double, List[String]))] = nowPrRDD.join(pageListRDD)

      //整理数据
      val resultRDD: RDD[(String, List[String], Double)] = joinRDD.map {
        case (page: String, (pr: Double, list: List[String])) =>
          (page, list, pr)
      }


      /**
        * 计算每个网页新的pr值和上一次pr值，差值的平均值
        *
        */

      //上一次网页的pr值
      val lastPrRDD: RDD[(String, Double)] = pageRDD.map(kv => (kv._1, kv._3))

      //计算网页pr 差值
      val prJoinRDD: RDD[(String, (Double, Double))] = nowPrRDD.join(lastPrRDD)

      val prChaRDD: RDD[Double] = prJoinRDD.map {
        case (page: String, (noePr: Double, lastPr: Double)) =>
          Math.abs(lastPr - noePr)
      }
      //计算差值平均值
      val avgPr: Double = prChaRDD.sum() / prChaRDD.count()

      println("差值平均值：" + avgPr)

      //如果差值平均值小于阈值，跳出循环
      if (avgPr < 0.001) {
        flag = false
      }


      //第二次计算使用第一次的结果
      pageRDD = resultRDD
    }

    pageRDD.foreach(println)

  }

}
